02 开发一个多 Agent 协同客服系统
开发一个多 Agent 协同客服系统
场景设定:
用户提问:“我的订单为什么还没发货?”
系统拆解任务:
Agent A:查询订单状态(调用内部系统)
Agent B:检查物流信息(调用第三方 API)
Agent C:汇总结果并生成回复
技术难点:
如何设计 Agent 之间的通信协议?
如何保证失败重试机制?
如何可视化 Agent 之间的交互过程
好的,很高兴能帮助您开发一个多 Agent 协同客服系统。这是一个非常实用的项目,可以大大提升客户服务的效率和体验。
了解需求
您希望构建一个多 Agent 协同客服系统,用于处理用户提出的客服问题。您提供了一个具体场景:“我的订单为什么还没发货?”,并拆解了相应的 Agent 职责:
- Agent A: 查询订单状态(调用内部系统)
- Agent B: 检查物流信息(调用第三方 API)
- Agent C: 汇总结果并生成回复
您也提出了三个关键的技术难点:
- 如何设计 Agent 之间的通信协议?
- 如何保证失败重试机制?
- 如何可视化 Agent 之间的交互过程?
我将针对这些问题提供详细的解决方案和代码示例。
概略介绍解决方案
我们将采用一个中心化的协调者(Orchestrator)模式来管理 Agent 之间的通信和任务流。每个 Agent 都是一个独立的模块,负责执行特定任务,并通过协调者进行消息传递。
系统工作原理概述:
- 用户提出问题,例如:“我的订单为什么还没发货?”
- 协调者接收到用户问题,并根据预设规则(或通过自然语言处理)识别出需要激活的 Agent。
- 协调者将任务分配给 Agent A(查询订单状态)。
- Agent A 执行查询,并将结果返回给协调者。
- 协调者判断是否需要 Agent B 介入,如果需要,将任务分配给 Agent B(检查物流信息),并可能将 Agent A 的部分结果作为输入传递给 Agent B。
- Agent B 执行查询,并将结果返回给协调者。
- 协调者在收到所有必要信息后,将所有相关结果传递给 Agent C(汇总结果并生成回复)。
- Agent C 根据收到的信息生成最终的用户回复,并将其返回给协调者。
- 协调者将最终回复呈现给用户。
开发步骤:
我们将分以下几个步骤来实现:
- 定义 Agent 接口和基类: 确保所有 Agent 都遵循统一的结构。
- 实现协调者(Orchestrator): 负责任务分发、结果汇总和通信管理。
- 实现 Agent A (查询订单状态): 模拟内部系统调用。
- 实现 Agent B (检查物流信息): 模拟第三方 API 调用。
- 实现 Agent C (汇总结果并生成回复): 模拟智能回复生成。
- 设计 Agent 之间的通信协议: 使用消息队列或事件总线。
- 实现失败重试机制: 在协调者或 Agent 内部处理。
- 可视化 Agent 交互过程: 使用日志和简单的状态图。
假设和限制:
- 我们将使用 Python 作为开发语言。
- 内部系统和第三方 API 调用将用模拟数据代替,以简化示例。
- 自然语言处理部分将简化,主要侧重于 Agent 之间的协同。
- 可视化将是基于文本的日志输出,而不是复杂的图形界面。
展示代码和实现说明
我们将从基础结构开始,逐步构建系统。
1. 定义 Agent 接口和基类
首先,我们定义一个抽象基类 Agent,所有具体的 Agent 都将继承它。这将确保它们都有一个 execute 方法,用于执行其核心逻辑。
import abc
class Agent(abc.ABC):
"""
Agent 的抽象基类。
所有具体的 Agent 都应继承此基类并实现 execute 方法。
"""
def __init__(self, name: str):
self.name = name
@abc.abstractmethod
def execute(self, task_data: dict) -> dict:
"""
执行 Agent 的核心逻辑。
Args:
task_data (dict): 包含 Agent 执行任务所需输入数据的字典。
Returns:
dict: 包含 Agent 执行结果的字典。
"""
pass
def __str__(self):
return f"Agent({self.name})"
设计思路:
- abc.ABC 用于定义抽象基类,强制子类实现 execute 方法。
- __init__ 方法用于初始化 Agent 的名称。
- execute 方法是 Agent 的核心,它接收一个 task_data 字典作为输入,并返回一个包含执行结果的字典。这种字典结构允许灵活地传递各种类型的数据。
2. 实现协调者(Orchestrator)
协调者是系统的核心,负责管理 Agent 的生命周期、任务分配、结果收集和通信。
Python
import time
import uuid
from typing import Dict, Type, Any, Callable
class Orchestrator:
"""
负责协调不同 Agent 之间任务流的调度器。
它管理 Agent 注册、任务分发、结果收集和错误处理。
"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.task_queue: list = [] # 简单队列,实际生产环境可用消息队列
self.results: Dict[str, Any] = {}
self.event_log: list = [] # 用于记录交互过程
def register_agent(self, agent: Agent):
"""
注册一个 Agent 到协调器。
"""
if agent.name in self.agents:
print(f"警告: Agent '{agent.name}' 已注册,将被覆盖。")
self.agents[agent.name] = agent
self._log_event(f"Agent '{agent.name}' 已注册。")
def _log_event(self, event_description: str, task_id: str = None, agent_name: str = None, data: Any = None):
"""
记录 Agent 交互事件,用于可视化和调试。
"""
timestamp = time.time()
log_entry = {
"timestamp": timestamp,
"description": event_description,
"task_id": task_id,
"agent_name": agent_name,
"data": data
}
self.event_log.append(log_entry)
print(f"[日志] {time.strftime('%H:%M:%S', time.localtime(timestamp))} - {event_description}")
def process_user_query(self, query: str) -> str:
"""
处理用户查询,启动 Agent 协同流程。
"""
task_id = str(uuid.uuid4())
self.results[task_id] = {}
self._log_event(f"收到用户查询: '{query}'", task_id=task_id)
# 示例:根据查询内容初步判断需要激活哪些 Agent
# 实际应用中,这里会集成 NLP 或更复杂的规则引擎
if "订单" in query or "发货" in query:
self._log_event(f"识别到订单/发货相关查询,启动 Agent A 和 Agent B", task_id=task_id)
self.task_queue.append({"agent": "AgentA", "task_id": task_id, "input": {"query": query}})
self.task_queue.append({"agent": "AgentB", "task_id": task_id, "input": {"query": query}})
else:
self._log_event(f"无法识别查询类型,直接交给 Agent C 回复", task_id=task_id)
self.task_queue.append({"agent": "AgentC", "task_id": task_id, "input": {"query": query, "status": "无法识别", "logistics": "无信息"}})
# 执行任务队列中的 Agent
while self.task_queue:
task_item = self.task_queue.pop(0) # FIFO
agent_name = task_item["agent"]
current_task_id = task_item["task_id"]
task_input = task_item["input"]
if agent_name not in self.agents:
self._log_event(f"错误: 未找到 Agent '{agent_name}'", task_id=current_task_id)
continue
agent = self.agents[agent_name]
self._log_event(f"正在执行 Agent '{agent.name}' (任务ID: {current_task_id})", task_id=current_task_id, agent_name=agent.name, data=task_input)
try:
# 尝试执行 Agent,并实现重试机制
retries = 3
for attempt in range(retries):
try:
agent_result = agent.execute(task_input)
self._log_event(f"Agent '{agent.name}' 执行成功,结果: {agent_result}", task_id=current_task_id, agent_name=agent.name, data=agent_result)
self.results[current_task_id].update({agent.name: agent_result}) # 汇总结果
break # 成功则跳出重试循环
except Exception as e:
self._log_event(f"Agent '{agent.name}' 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
if attempt < retries - 1:
time.sleep(1) # 重试前等待
else:
self._log_event(f"Agent '{agent.name}' 最终执行失败,任务ID: {current_task_id}", task_id=current_task_id, agent_name=agent.name, data={"error": "最终失败"})
self.results[current_task_id].update({agent.name: {"error": "执行失败", "details": str(e)}})
# 如果某个关键 Agent 失败,可能需要中断或退回,这里简化为继续
except Exception as e:
self._log_event(f"Orchestrator 处理 Agent '{agent.name}' 时发生意外错误: {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
self.results[current_task_id].update({agent.name: {"error": "协调器处理错误", "details": str(e)}})
# 所有前置 Agent 执行完毕后,将结果传递给 Agent C 生成回复
final_result_for_agent_c = self.results.get(task_id, {})
self._log_event(f"将所有 Agent 结果汇总给 Agent C: {final_result_for_agent_c}", task_id=task_id, agent_name="AgentC")
# 确保 AgentC 被执行
if "AgentC" not in self.agents:
self._log_event(f"错误: 未注册 AgentC,无法生成回复", task_id=task_id)
return "系统错误:无法生成回复。"
try:
retries = 3
final_response = "系统错误:无法生成回复。"
for attempt in range(retries):
try:
agent_c_result = self.agents["AgentC"].execute(final_result_for_agent_c)
final_response = agent_c_result.get("response", "未能生成有效回复。")
self._log_event(f"Agent C 生成最终回复: {final_response}", task_id=task_id, agent_name="AgentC", data=agent_c_result)
break
except Exception as e:
self._log_event(f"Agent C 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=task_id, agent_name="AgentC", data={"error": str(e)})
if attempt < retries - 1:
time.sleep(1)
return final_response
except Exception as e:
self._log_event(f"Orchestrator 处理 Agent C 时发生意外错误: {e}", task_id=task_id, agent_name="AgentC", data={"error": str(e)})
return f"系统在生成回复时遇到错误: {e}"
def get_event_log(self) -> list:
"""
获取所有记录的事件日志。
"""
return self.event_log
设计思路:
-
__init__: 初始化注册的 Agent 字典、任务队列、结果存储和事件日志。
-
register_agent: 用于将具体的 Agent 实例添加到协调器中。
-
_log_event: 这是一个关键方法,用于记录系统内部的所有重要事件,包括 Agent 的启动、完成、失败等。它将用于后续的可视化。
-
process_user_query:
这是整个系统的入口点。
- 它为每个用户查询生成一个唯一的 task_id,以便跟踪。
- 任务拆解与分发: 示例中通过简单的关键字匹配 ("订单" 或 "发货") 来决定激活哪些 Agent。在实际应用中,这里会集成更复杂的 NLP 模型(如意图识别、实体抽取)来智能地分配任务。
- 任务队列 (self.task_queue): 这是一个简单的列表,用于存储待执行的任务。在生产环境中,这通常会替换为异步消息队列(如 RabbitMQ, Kafka),以支持解耦、削峰和异步处理。
- 结果汇总 (self.results): 用于存储不同 Agent 执行的结果。
- 失败重试机制: 在 while self.task_queue: 循环中,每次执行 Agent 时,都尝试 retries 次。如果 Agent 抛出异常,会等待一段时间后重试。
-
Agent 之间的通信协议: 这里的通信协议是隐式的,通过协调者传递 task_data 字典来实现。每个 Agent 接收输入字典,并返回输出字典。协调者负责将一个 Agent 的输出作为另一个 Agent 的输入。
-
get_event_log: 提供访问事件日志的接口。
3. 实现 Agent A (查询订单状态)
模拟内部系统查询,例如数据库查询。
Python
import random
class AgentA(Agent):
"""
模拟查询订单状态的 Agent。
"""
def __init__(self):
super().__init__("AgentA")
def execute(self, task_data: dict) -> dict:
order_id = task_data.get("order_id") # 假设从用户查询中提取到订单ID
if not order_id:
# 模拟从用户查询中尝试识别订单号
query = task_data.get("query", "")
if "订单号" in query: # 简单模拟提取
parts = query.split("订单号")
if len(parts) > 1:
order_id = parts[1].strip().split(" ")[0] # 提取第一个单词作为订单号
if not order_id:
# 随机生成一个模拟订单号用于演示
order_id = f"ORDER_{random.randint(1000, 9999)}"
print(f"AgentA: 正在查询订单 {order_id} 的状态...")
time.sleep(random.uniform(0.5, 1.5)) # 模拟查询耗时
# 模拟内部系统查询结果
statuses = ["已付款待发货", "已发货", "已完成", "已取消"]
random_status = random.choice(statuses)
# 模拟随机失败
if random.random() < 0.1: # 10% 的概率失败
raise Exception("内部系统查询失败:数据库连接超时。")
return {"order_id": order_id, "status": random_status}
设计思路:
- 继承 Agent 基类。
- execute 方法接收 task_data,尝试从其中提取 order_id。如果用户查询中没有明确提供,则模拟生成一个。
- time.sleep 模拟网络延迟或数据库查询时间。
- random.choice 模拟不同的订单状态。
- 失败模拟: if random.random() < 0.1: 模拟了 Agent 在执行过程中可能遇到的失败,这将触发协调者的重试机制。
4. 实现 Agent B (检查物流信息)
模拟调用第三方物流 API。
Python
class AgentB(Agent):
"""
模拟检查物流信息的 Agent。
"""
def __init__(self):
super().__init__("AgentB")
def execute(self, task_data: dict) -> dict:
order_id = task_data.get("order_id")
if not order_id:
# 如果 Agent A 没有提供,尝试从用户查询中提取
query = task_data.get("query", "")
if "订单号" in query:
parts = query.split("订单号")
if len(parts) > 1:
order_id = parts[1].strip().split(" ")[0]
if not order_id:
# 随机生成一个模拟订单号用于演示
order_id = f"ORDER_{random.randint(1000, 9999)}"
print(f"AgentB: 正在检查订单 {order_id} 的物流信息...")
time.sleep(random.uniform(0.7, 2.0)) # 模拟 API 调用耗时
# 模拟第三方物流 API 结果
logistics_info = {
"tracking_number": f"TRK{random.randint(100000, 999999)}",
"last_update": f"2025-06-{random.randint(10, 16)} 14:30",
"location": random.choice(["分拣中心", "运输中", "派送中", "已签收"]),
"estimated_delivery": f"2025-06-{random.randint(17, 20)}"
}
# 模拟随机失败
if random.random() < 0.15: # 15% 的概率失败
raise Exception("第三方物流 API 调用失败:服务不可用。")
return {"order_id": order_id, "logistics": logistics_info}
设计思路:
- 与 Agent A 类似,从 task_data 中获取 order_id。
- time.sleep 模拟 API 调用延迟。
- logistics_info 模拟第三方 API 返回的数据结构。
- 失败模拟: 模拟第三方 API 调用的失败情况。
5. 实现 Agent C (汇总结果并生成回复)
这个 Agent 负责整合所有信息,并生成一个友好的回复。
Python
class AgentC(Agent):
"""
汇总所有 Agent 结果并生成回复的 Agent。
"""
def __init__(self):
super().__init__("AgentC")
def execute(self, task_data: dict) -> dict:
print(f"AgentC: 正在汇总结果并生成回复...")
time.sleep(0.3) # 模拟处理时间
agent_a_result = task_data.get("AgentA", {})
agent_b_result = task_data.get("AgentB", {})
order_id = agent_a_result.get("order_id", agent_b_result.get("order_id", "未知订单"))
status = agent_a_result.get("status", "未知状态")
logistics = agent_b_result.get("logistics", {})
response_parts = []
response_parts.append(f"关于您的订单 {order_id},")
if "error" in agent_a_result:
response_parts.append(f"查询订单状态时遇到问题:{agent_a_result['error']}。")
else:
response_parts.append(f"当前状态显示为:'{status}'。")
if "error" in agent_b_result:
response_parts.append(f"查询物流信息时遇到问题:{agent_b_result['error']}。")
else:
if logistics:
response_parts.append(f"最新物流更新:您的包裹目前在'{logistics.get('location', '未知位置')}',追踪号为'{logistics.get('tracking_number', '无')}'。")
response_parts.append(f"预计送达日期:{logistics.get('estimated_delivery', '未知')}。")
else:
response_parts.append(f"暂无详细物流信息。")
response_parts.append("感谢您的耐心等待!")
final_response = " ".join(response_parts)
# 模拟随机失败
if random.random() < 0.05: # 5% 的概率失败
raise Exception("回复生成服务故障。")
return {"response": final_response}
设计思路:
- 从 task_data 中获取 AgentA 和 AgentB 的结果。
- 根据获取到的信息(包括可能的错误信息),动态生成用户友好的回复。
- 这个 Agent 是整个流程的收尾,将所有碎片信息整合。
- 失败模拟: 模拟回复生成过程中的失败。
6. 设计 Agent 之间的通信协议
如前所述,我们采用了基于协调者的隐式通信协议。
- 数据结构: 所有 Agent 之间的通信都通过 dict 数据类型进行。协调者将上一个 Agent 的输出(一个 dict)作为下一个 Agent 的输入(task_data 字典)的一部分。
- 消息传递: 协调者的 task_queue 充当一个简化的消息队列。当一个 Agent 完成任务后,它的结果会被协调者存储,并在需要时传递给下一个 Agent。
- 统一接口: Agent 抽象基类强制所有 Agent 实现 execute(task_data: dict) -> dict 接口,确保了通信的标准化。
优点:
- 简单易实现: 对于小型系统,这种方式足够。
- 集中控制: 协调者对整个流程有完全的控制权,便于调试和监控。
- 灵活数据传递: 字典结构允许传递任意复杂的数据。
缺点(对于大规模分布式系统):
- 单点故障: 协调者是核心,如果它崩溃,整个系统将停摆。
- 可伸缩性: 协调者可能成为瓶颈,尤其是在处理大量并发请求时。
- 解耦性: Agent 之间虽然不直接通信,但它们依然依赖于协调者的存在。
改进方案(对于生产环境):
- 消息队列(如 RabbitMQ, Kafka): 每个 Agent 订阅自己感兴趣的消息,并将结果发布到另一个主题。协调者只负责初始任务分发和最终结果聚合。这将大大提高系统的解耦性、可伸缩性和容错性。
- API Gateway/微服务: 每个 Agent 作为一个独立的微服务,通过 RESTful API 或 gRPC 进行通信。协调者调用这些服务并编排流程。
- 共享数据库/KV 存储: Agent 可以将结果写入共享存储,其他 Agent 从中读取所需数据。
7. 失败重试机制
我们已经在 Orchestrator 的 process_user_query 方法中实现了简单的失败重试机制。
Python
try:
# 尝试执行 Agent,并实现重试机制
retries = 3 # 定义重试次数
for attempt in range(retries):
try:
agent_result = agent.execute(task_input)
# ... 成功处理 ...
break # 成功则跳出重试循环
except Exception as e:
self._log_event(f"Agent '{agent.name}' 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
if attempt < retries - 1:
time.sleep(1) # 重试前等待1秒
else:
self._log_event(f"Agent '{agent.name}' 最终执行失败,任务ID: {current_task_id}", task_id=current_task_id, agent_name=agent.name, data={"error": "最终失败"})
self.results[current_task_id].update({agent.name: {"error": "执行失败", "details": str(e)}})
except Exception as e:
# ... 处理协调器内部的错误 ...
实现说明:
- 重试循环: 在 Orchestrator 中,每个 Agent 的 execute 调用都被包裹在一个 for 循环中,允许进行多次尝试 (retries 次)。
- 异常捕获: 使用 try...except 块来捕获 Agent 执行时可能抛出的任何异常。
- 间隔重试: time.sleep(1) 在每次重试前增加短暂的延迟,避免立即重试导致的服务压力。
- 日志记录: 每次尝试和最终失败都会被记录到事件日志中,便于跟踪问题。
- 最终失败处理: 如果所有重试都失败,会记录最终的失败状态,并可能在结果中标记错误,以便 Agent C 进行处理。
更高级的重试策略:
- 指数退避: 每次重试的等待时间逐渐增加(例如,1s, 2s, 4s, 8s),以避免对失败的服务造成过多压力。
- 熔断器模式: 当某个 Agent 连续失败达到一定阈值时,暂时停止对其的调用,避免浪费资源和进一步恶化问题。一段时间后才尝试恢复调用。
- 死信队列: 对于无法处理的消息或任务,将其发送到死信队列,以便后续分析和手动干预。
8. 可视化 Agent 之间的交互过程
我们通过 Orchestrator 中的 _log_event 方法和 event_log 列表来实现简单的可视化。
Python
class Orchestrator:
# ...
def _log_event(self, event_description: str, task_id: str = None, agent_name: str = None, data: Any = None):
"""
记录 Agent 交互事件,用于可视化和调试。
"""
timestamp = time.time()
log_entry = {
"timestamp": timestamp,
"description": event_description,
"task_id": task_id,
"agent_name": agent_name,
"data": data
}
self.event_log.append(log_entry)
print(f"[日志] {time.strftime('%H:%M:%S', time.localtime(timestamp))} - {event_description}")
# ...
实现说明:
- 每次关键操作(如接收查询、Agent 启动、Agent 完成、Agent 失败)都会调用 _log_event。
- 日志条目包含时间戳、描述、任务 ID、Agent 名称和相关数据,提供详细的上下文信息。
- print 语句将日志实时输出到控制台。
- event_log 列表收集所有日志条目,允许在流程结束后进行分析。
简单的可视化示例(运行代码后查看控制台输出):
当您运行系统时,控制台将输出类似以下内容的日志:
[日志] 10:41:40 - 收到用户查询: '我的订单为什么还没发货?' (任务ID: ...)
[日志] 10:41:40 - 识别到订单/发货相关查询,启动 Agent A 和 Agent B (任务ID: ...)
[日志] 10:41:40 - 正在执行 Agent 'AgentA' (任务ID: ...)
AgentA: 正在查询订单 ORDER_xxxx 的状态...
[日志] 10:41:41 - Agent 'AgentA' 执行成功,结果: {'order_id': 'ORDER_xxxx', 'status': '已付款待发货'} (任务ID: ...)
[日志] 10:41:41 - 正在执行 Agent 'AgentB' (任务ID: ...)
AgentB: 正在检查订单 ORDER_xxxx 的物流信息...
[日志] 10:41:42 - Agent 'AgentB' 执行成功,结果: {'order_id': 'ORDER_xxxx', 'logistics': {...}} (任务ID: ...)
[日志] 10:41:42 - 将所有 Agent 结果汇总给 Agent C: {...} (任务ID: ...)
[日志] 10:41:42 - 正在执行 Agent 'AgentC' (任务ID: ...)
AgentC: 正在汇总结果并生成回复...
[日志] 10:41:42 - Agent C 生成最终回复: 关于您的订单 ORDER_xxxx,当前状态显示为:'已付款待发货'。最新物流更新:... 感谢您的耐心等待! (任务ID: ...)
如果发生错误,您会看到类似:
[日志] 10:41:45 - Agent 'AgentA' 执行失败 (尝试 1/3): 内部系统查询失败:数据库连接超时。 (任务ID: ...)
[日志] 10:41:46 - 正在执行 Agent 'AgentA' (任务ID: ...)
AgentA: 正在查询订单 ORDER_xxxx 的状态...
[日志] 10:41:47 - Agent 'AgentA' 执行成功,结果: {...} (任务ID: ...)
更高级的可视化:
- 图表工具: 使用 Mermaid、Graphviz 等工具生成流程图或状态图,根据 event_log 动态绘制 Agent 间的交互流程。
- Web Dashboard: 构建一个简单的 Web 界面,实时显示 Agent 状态、任务队列、错误日志和交互流程的可视化。
- 分布式追踪系统: 集成 OpenTracing 或 OpenTelemetry,将 Agent 之间的调用链路进行追踪和可视化(例如使用 Jaeger)。
完整代码示例
将所有组件组合在一起:
Python
import abc
import time
import uuid
import random
from typing import Dict, Type, Any, Callable
# 1. 定义 Agent 接口和基类
class Agent(abc.ABC):
"""
Agent 的抽象基类。
所有具体的 Agent 都应继承此基类并实现 execute 方法。
"""
def __init__(self, name: str):
self.name = name
@abc.abstractmethod
def execute(self, task_data: dict) -> dict:
"""
执行 Agent 的核心逻辑。
Args:
task_data (dict): 包含 Agent 执行任务所需输入数据的字典。
Returns:
dict: 包含 Agent 执行结果的字典。
"""
pass
def __str__(self):
return f"Agent({self.name})"
# 2. 实现协调者(Orchestrator)
class Orchestrator:
"""
负责协调不同 Agent 之间任务流的调度器。
它管理 Agent 注册、任务分发、结果收集和错误处理。
"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.task_queue: list = [] # 简单队列,实际生产环境可用消息队列
self.results: Dict[str, Any] = {}
self.event_log: list = [] # 用于记录交互过程
def register_agent(self, agent: Agent):
"""
注册一个 Agent 到协调器。
"""
if agent.name in self.agents:
print(f"警告: Agent '{agent.name}' 已注册,将被覆盖。")
self.agents[agent.name] = agent
self._log_event(f"Agent '{agent.name}' 已注册。")
def _log_event(self, event_description: str, task_id: str = None, agent_name: str = None, data: Any = None):
"""
记录 Agent 交互事件,用于可视化和调试。
"""
timestamp = time.time()
log_entry = {
"timestamp": timestamp,
"description": event_description,
"task_id": task_id,
"agent_name": agent_name,
"data": data
}
self.event_log.append(log_entry)
print(f"[日志] {time.strftime('%H:%M:%S', time.localtime(timestamp))} - {event_description}")
def process_user_query(self, query: str) -> str:
"""
处理用户查询,启动 Agent 协同流程。
"""
task_id = str(uuid.uuid4())
self.results[task_id] = {}
self._log_event(f"收到用户查询: '{query}'", task_id=task_id)
# 示例:根据查询内容初步判断需要激活哪些 Agent
# 实际应用中,这里会集成 NLP 或更复杂的规则引擎
if "订单" in query or "发货" in query:
self._log_event(f"识别到订单/发货相关查询,启动 Agent A 和 Agent B", task_id=task_id)
# 添加 Agent A 和 Agent B 的任务
self.task_queue.append({"agent": "AgentA", "task_id": task_id, "input": {"query": query}})
self.task_queue.append({"agent": "AgentB", "task_id": task_id, "input": {"query": query}})
else:
self._log_event(f"无法识别查询类型,直接交给 Agent C 回复", task_id=task_id)
# 对于无法识别的查询,直接给 Agent C 一个通用输入
self.task_queue.append({"agent": "AgentC", "task_id": task_id, "input": {"query": query, "AgentA": {"status": "未知"}, "AgentB": {"logistics": {}}}})
# 执行任务队列中的 Agent
# 这里需要注意,Agent B 可能需要 Agent A 的输出,所以需要确保 Agent A 先执行
# 在这个简单的队列中,我们通过先添加 Agent A 再添加 Agent B 来保证顺序
# 更复杂的依赖管理需要有向无环图 (DAG) 或专门的工作流引擎
processed_agents = set() # 记录已处理的 Agent
while self.task_queue:
# 找到一个可以执行的任务 (其依赖的 Agent 已经处理完毕)
current_task_item = None
for i, task_item in enumerate(self.task_queue):
agent_name = task_item["agent"]
# 简化依赖:Agent B 依赖 Agent A,Agent C 依赖 Agent A 和 Agent B
if agent_name == "AgentA":
current_task_item = self.task_queue.pop(i)
break
elif agent_name == "AgentB":
if "AgentA" in processed_agents:
# 传递 AgentA 的结果给 AgentB
task_item["input"].update(self.results[task_item["task_id"]].get("AgentA", {}))
current_task_item = self.task_queue.pop(i)
break
elif agent_name == "AgentC":
# AgentC 依赖所有前置 Agent,所以等队列为空或只剩AgentC时再执行
if len(self.task_queue) == 1 and self.task_queue[0]["agent"] == "AgentC":
current_task_item = self.task_queue.pop(i)
break
if not current_task_item:
# 如果没有可执行的任务,且队列不为空,说明存在循环依赖或逻辑问题
# 对于本例,表示前置Agent还没执行完
if self.task_queue and len(processed_agents) < len(self.agents) -1: # -1 因为 AgentC是最后执行
time.sleep(0.1) # 等待一下,看是否有Agent完成
continue # 继续循环查找可执行任务
else:
break # 队列为空或者只剩下 AgentC 但其依赖还未完全满足,跳出
agent_name = current_task_item["agent"]
current_task_id = current_task_item["task_id"]
task_input = current_task_item["input"]
if agent_name not in self.agents:
self._log_event(f"错误: 未找到 Agent '{agent_name}'", task_id=current_task_id)
continue
agent = self.agents[agent_name]
self._log_event(f"正在执行 Agent '{agent.name}' (任务ID: {current_task_id})", task_id=current_task_id, agent_name=agent.name, data=task_input)
try:
# 尝试执行 Agent,并实现重试机制
retries = 3
for attempt in range(retries):
try:
agent_result = agent.execute(task_input)
self._log_event(f"Agent '{agent.name}' 执行成功,结果: {agent_result}", task_id=current_task_id, agent_name=agent.name, data=agent_result)
self.results[current_task_id].update({agent.name: agent_result}) # 汇总结果
processed_agents.add(agent.name) # 标记为已处理
break # 成功则跳出重试循环
except Exception as e:
self._log_event(f"Agent '{agent.name}' 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
if attempt < retries - 1:
time.sleep(1) # 重试前等待
else:
self._log_event(f"Agent '{agent.name}' 最终执行失败,任务ID: {current_task_id}", task_id=current_task_id, agent_name=agent.name, data={"error": "最终失败"})
self.results[current_task_id].update({agent.name: {"error": "执行失败", "details": str(e)}})
# 即使失败,也标记为已处理,否则可能卡住
processed_agents.add(agent.name)
except Exception as e:
self._log_event(f"Orchestrator 处理 Agent '{agent.name}' 时发生意外错误: {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
self.results[current_task_id].update({agent.name: {"error": "协调器处理错误", "details": str(e)}})
processed_agents.add(agent.name) # 标记为已处理
# 所有前置 Agent 执行完毕后,将结果传递给 Agent C 生成回复
final_result_for_agent_c = self.results.get(task_id, {})
self._log_event(f"将所有 Agent 结果汇总给 Agent C: {final_result_for_agent_c}", task_id=task_id, agent_name="AgentC")
# 确保 AgentC 被执行
if "AgentC" not in self.agents:
self._log_event(f"错误: 未注册 AgentC,无法生成回复", task_id=task_id)
return "系统错误:无法生成回复。"
try:
retries = 3
final_response = "系统错误:无法生成回复。"
for attempt in range(retries):
try:
agent_c_result = self.agents["AgentC"].execute(final_result_for_agent_c)
final_response = agent_c_result.get("response", "未能生成有效回复。")
self._log_event(f"Agent C 生成最终回复: {final_response}", task_id=task_id, agent_name="AgentC", data=agent_c_result)
break
except Exception as e:
self._log_event(f"Agent C 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=task_id, agent_name="AgentC", data={"error": str(e)})
if attempt < retries - 1:
time.sleep(1)
return final_response
except Exception as e:
self._log_event(f"Orchestrator 处理 Agent C 时发生意外错误: {e}", task_id=task_id, agent_name="AgentC", data={"error": str(e)})
return f"系统在生成回复时遇到错误: {e}"
def get_event_log(self) -> list:
"""
获取所有记录的事件日志。
"""
return self.event_log
# 3. 实现 Agent A (查询订单状态)
class AgentA(Agent):
"""
模拟查询订单状态的 Agent。
"""
def __init__(self):
super().__init__("AgentA")
def execute(self, task_data: dict) -> dict:
order_id = task_data.get("order_id")
if not order_id:
query = task_data.get("query", "")
# 模拟从用户查询中尝试识别订单号
import re
match = re.search(r'(订单号|订单|order ID|ID)[:\s]*(\w+)', query, re.IGNORECASE)
if match:
order_id = match.group(2)
else:
order_id = f"ORDER_{random.randint(1000, 9999)}" # 随机生成一个模拟订单号用于演示
print(f"AgentA: 正在查询订单 {order_id} 的状态...")
time.sleep(random.uniform(0.5, 1.5)) # 模拟查询耗时
statuses = ["已付款待发货", "已发货", "已完成", "已取消"]
random_status = random.choice(statuses)
# 模拟随机失败
if random.random() < 0.2: # 增加失败概率来演示重试
raise Exception("内部系统查询失败:数据库连接超时。")
return {"order_id": order_id, "status": random_status}
# 4. 实现 Agent B (检查物流信息)
class AgentB(Agent):
"""
模拟检查物流信息的 Agent。
"""
def __init__(self):
super().__init__("AgentB")
def execute(self, task_data: dict) -> dict:
order_id = task_data.get("order_id")
if not order_id:
# 如果 Agent A 没有提供,尝试从用户查询中提取
query = task_data.get("query", "")
import re
match = re.search(r'(订单号|订单|order ID|ID)[:\s]*(\w+)', query, re.IGNORECASE)
if match:
order_id = match.group(2)
else:
order_id = f"ORDER_{random.randint(1000, 9999)}"
print(f"AgentB: 正在检查订单 {order_id} 的物流信息...")
time.sleep(random.uniform(0.7, 2.0)) # 模拟 API 调用耗时
logistics_info = {
"tracking_number": f"TRK{random.randint(100000, 999999)}",
"last_update": f"2025-06-{random.randint(10, 16)} 14:30",
"location": random.choice(["分拣中心", "运输中", "派送中", "已签收"]),
"estimated_delivery": f"2025-06-{random.randint(17, 20)}"
}
# 模拟随机失败
if random.random() < 0.25: # 增加失败概率来演示重试
raise Exception("第三方物流 API 调用失败:服务不可用。")
return {"order_id": order_id, "logistics": logistics_info}
# 5. 实现 Agent C (汇总结果并生成回复)
class AgentC(Agent):
"""
汇总所有 Agent 结果并生成回复的 Agent。
"""
def __init__(self):
super().__init__("AgentC")
def execute(self, task_data: dict) -> dict:
print(f"AgentC: 正在汇总结果并生成回复...")
time.sleep(0.3) # 模拟处理时间
agent_a_result = task_data.get("AgentA", {})
agent_b_result = task_data.get("AgentB", {})
# 优先从 AgentA 获取订单ID,如果A失败则从B获取,最后是未知
order_id = agent_a_result.get("order_id") or agent_b_result.get("order_id") or "未知订单"
response_parts = []
response_parts.append(f"关于您的订单 {order_id},")
# 处理 Agent A 的结果
if "error" in agent_a_result:
response_parts.append(f"查询订单状态时遇到问题:{agent_a_result['error']}。")
else:
status = agent_a_result.get("status", "未知状态")
response_parts.append(f"当前状态显示为:'{status}'。")
# 处理 Agent B 的结果
if "error" in agent_b_result:
response_parts.append(f"查询物流信息时遇到问题:{agent_b_result['error']}。")
else:
logistics = agent_b_result.get("logistics", {})
if logistics:
response_parts.append(f"最新物流更新:您的包裹目前在'{logistics.get('location', '未知位置')}',追踪号为'{logistics.get('tracking_number', '无')}'。")
response_parts.append(f"预计送达日期:{logistics.get('estimated_delivery', '未知')}。")
else:
response_parts.append(f"暂无详细物流信息。")
response_parts.append("感谢您的耐心等待!")
final_response = " ".join(response_parts)
# 模拟随机失败
if random.random() < 0.05:
raise Exception("回复生成服务故障。")
return {"response": final_response}
# 主程序入口
if __name__ == "__main__":
orchestrator = Orchestrator()
# 注册所有 Agent
orchestrator.register_agent(AgentA())
orchestrator.register_agent(AgentB())
orchestrator.register_agent(AgentC())
print("\n--- 场景一:正常流程 ---")
user_query_1 = "我的订单号是 123456789,为什么还没发货?"
response_1 = orchestrator.process_user_query(user_query_1)
print(f"\n用户提问: '{user_query_1}'")
print(f"系统回复: '{response_1}'")
print("\n-----------------------------------\n")
# 重置日志和结果
orchestrator.event_log = []
orchestrator.results = {}
orchestrator.task_queue = []
print("\n--- 场景二:模拟 Agent A 和 Agent B 失败重试 ---")
user_query_2 = "我的订单为什么还没发货?" # 不带订单号,看Agent如何处理
response_2 = orchestrator.process_user_query(user_query_2)
print(f"\n用户提问: '{user_query_2}'")
print(f"系统回复: '{response_2}'")
print("\n-----------------------------------\n")
# 重置日志和结果
orchestrator.event_log = []
orchestrator.results = {}
orchestrator.task_queue = []
print("\n--- 场景三:无法识别的查询 ---")
user_query_3 = "你好,请问你们的工作时间?"
response_3 = orchestrator.process_user_query(user_query_3)
print(f"\n用户提问: '{user_query_3}'")
print(f"系统回复: '{response_3}'")
print("\n-----------------------------------\n")
# 可以选择打印完整的事件日志进行分析
# print("\n--- 完整的事件日志 ---")
# for entry in orchestrator.get_event_log():
# print(entry)
实现说明:
- 执行顺序的简化处理: 在 Orchestrator 的 process_user_query 中,为了简化,我通过在 self.task_queue 中添加 Agent 的顺序来暗示其执行顺序(Agent A -> Agent B -> Agent C)。
- 更完善的依赖管理: 在实际生产环境中,为了处理复杂的 Agent 依赖关系(例如,Agent B 只有在 Agent A 成功后才能启动),您需要一个更强大的工作流引擎,或者使用有向无环图(DAG)来定义任务依赖,例如 Apache Airflow、Prefect 等。
- 订单号提取: 在 Agent A 和 Agent B 中,我增加了简单的正则表达式来从用户查询中提取订单号,以使模拟更真实。
通过运行这个完整的代码,您将看到一个多 Agent 协同客服系统的基本框架,包括 Agent 间的通信、失败重试和简单的可视化日志。您可以根据需要扩展每个 Agent 的功能,并改进协调者的逻辑。